package gl.java.mq;

import gl.java.umsp.Umsp;
import gl.java.umsp.UmspHeader;
import gl.java.umsp.event.EventServerConfig;
import gl.java.util.JsonUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.json.JSONArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@ChannelHandler.Sharable
public class MessageQueueService extends SimpleChannelInboundHandler<UmspHeader> {


    List<Packet> cache = new CopyOnWriteArrayList();

    private static class Subscriber {
        String topic;
        Channel c;

        public Subscriber(String topic, Channel c) {
            this.topic = topic;
            this.c = c;
        }
    }

    private List<Subscriber> topicSubscriber = new CopyOnWriteArrayList<Subscriber>();
    private final int mPort;
    private static Logger log = LoggerFactory.getLogger(MessageQueueService.class);
    NettyServer mNettyServer;

    public MessageQueueService(int port) {
        this.mPort = port;

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("[InActive]"+ctx.channel());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("[Active]"+ctx.channel());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, UmspHeader msg) throws Exception {
        log.info("[SUBSCRIBE] " + msg.toString());
        try {
            final String json = new String(msg.payload);
            switch (msg.cmd) {
                case Umsp.CMD_SUBSCRIBE:
                    JSONArray ja = new JSONArray(json);
                    for (int i = 0; i < ja.length(); i++) {
                        String string = ja.getString(i);
                        Channel channel = channelHandlerContext.channel();
                        topicSubscriber.add(new Subscriber(string, channel));
                        ArrayList<Packet> t = new ArrayList<Packet>();
                        Iterator<Packet> it = cache.iterator();
                        while (it.hasNext()) {
                            Packet p = it.next();
                            if (p.topic.equals(string)) {
                                transfer(p.msg, p.topic);
                                t.add(p);
                            }
                        }
                        cache.removeAll(t);
                    }
                    break;
                case Umsp.CMD_PUBLISH:
                    Packet p = JsonUtil.fromJson(json, Packet.class);
                    transfer(p.msg, p.topic);
                    break;
                default:
                    log.warn("receive a unknown msg :" + msg);
                    break;
            }

        } catch (Exception e) {
            e.printStackTrace();
            log.warn(e.getMessage());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[ERROR] mq fire exception:",cause);
        ctx.close();
    }

    private boolean transfer(String json, String topic) {
        boolean isHasSub = false;
        for (int i = 0; i < topicSubscriber.size(); i++) {
            Subscriber subscriber = topicSubscriber.get(i);
            if (topic.equals(subscriber.topic)) {
                if (subscriber.c.isWritable()) {
                    NettyMsgHelper.sendPublishMsg(new Packet(topic, json), subscriber.c);
                } else {
                    log.warn("write fail" + subscriber.c);
                }
                isHasSub = true;
            }
        }
        if (!isHasSub) {
            cache(json, topic);
        }
        return isHasSub;
    }

    private void cache(String json, String topic) {
        cache.add(new Packet(topic, json));
    }


    public void start() {
        new NettyServer().start(mPort, this);
    }


    public static void main(String[] args) {

        int port = EventServerConfig.ServerPort;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new MessageQueueService(port).start();
    }
}